[PySpark] - Add mapInPandas and mapInArrow methods to DataFrame class#325
Open
mariotaddeucci wants to merge 3 commits intoduckdb:mainfrom
Open
[PySpark] - Add mapInPandas and mapInArrow methods to DataFrame class#325mariotaddeucci wants to merge 3 commits intoduckdb:mainfrom
mariotaddeucci wants to merge 3 commits intoduckdb:mainfrom
Conversation
Contributor
Author
|
Hey @evertlammerts, will the main branch be updated, or should I point this PR to another branch? |
Collaborator
|
Hey @mariotaddeucci, there's a merge PR up at #351. As soon as that works you can rebase this. |
Contributor
There was a problem hiding this comment.
Pull request overview
Adds PySpark-compatible DataFrame.mapInPandas and DataFrame.mapInArrow APIs to DuckDB’s experimental Spark DataFrame implementation, along with typing support and tests.
Changes:
- Implement
mapInArrow(ArrowRecordBatchiterator in/out) andmapInPandas(pandasDataFrameiterator in/out) onDataFrame. - Add iterator-function typing aliases for Pandas/Arrow mapping functions.
- Add fast tests covering basic behavior, empty results, and a “no data loss” scenario; update DuckDB submodule revision.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| tests/fast/spark/test_spark_dataframe_map_in.py | Adds tests for mapInPandas/mapInArrow, including empty output and large dataset validation. |
| duckdb/experimental/spark/sql/dataframe.py | Implements mapInArrow and mapInPandas methods on DataFrame with docstrings and limited feature support. |
| duckdb/experimental/spark/_typing.py | Introduces PandasMapIterFunction and ArrowMapIterFunction type aliases. |
| external/duckdb | Bumps DuckDB submodule commit to pick up required functionality. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+1517
to
+1518
| ds = dataset(reader) # noqa: F841 | ||
| df = DataFrame(self.session.conn.sql("SELECT * FROM ds"), self.session) |
Comment on lines
+1548
to
+1550
| schema : :class:`pyspark.sql.types.DataType` or str | ||
| the return type of the `func` in PySpark. The value can be either a | ||
| :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. |
Comment on lines
+1581
to
+1584
| >>> def mean_age(iterator): | ||
| ... for pdf in iterator: | ||
| ... yield pdf.groupby("id").mean().reset_index() | ||
| >>> df.mapInPandas(mean_age, "id: bigint, age: double").show() |
Comment on lines
+22
to
+24
| import pyarrow | ||
| from numpy import float32, float64, int32, int64, ndarray | ||
| from pandas import DataFrame as PandasDataFrame |
Comment on lines
+35
to
+39
| DataFrameLike = PandasDataFrame | ||
|
|
||
| PandasMapIterFunction = Callable[[Iterable[DataFrameLike]], Iterable[DataFrameLike]] | ||
|
|
||
| ArrowMapIterFunction = Callable[[Iterable[pyarrow.RecordBatch]], Iterable[pyarrow.RecordBatch]] |
Comment on lines
+50
to
+59
| n = 10_000_000 | ||
|
|
||
| pandas_df = pd.DataFrame( | ||
| { | ||
| "id": np.arange(n, dtype=np.int64), | ||
| "value_float": np.random.rand(n).astype(np.float32), | ||
| "value_int": np.random.randint(0, 1000, size=n, dtype=np.int32), | ||
| "category": np.random.randint(0, 10, size=n, dtype=np.int8), | ||
| } | ||
| ) |
Comment on lines
+66
to
+67
| generated_pandas_df = df.toPandas() | ||
| total_records = df.count() |
| total_records = df.count() | ||
|
|
||
| assert total_records == n | ||
| assert pandas_df["id"].equals(generated_pandas_df["id"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.